From 936d91fbba790389cdf6f438f93888c0dbf09fd8 Mon Sep 17 00:00:00 2001 From: "cl349@firebug.cl.cam.ac.uk" Date: Fri, 17 Jun 2005 16:23:55 +0000 Subject: [PATCH] bitkeeper revision 1.1713.3.8 (42b2f91bG45uUFWHhUaUha3e1OAxJQ) xsnode.py: Updated watches/event code from Mike Wray. Signed-off-by: Mike Wray Signed-off-by: Christian Limpach --- tools/python/xen/xend/xenstore/xsnode.py | 219 +++++++++++++++-------- 1 file changed, 141 insertions(+), 78 deletions(-) diff --git a/tools/python/xen/xend/xenstore/xsnode.py b/tools/python/xen/xend/xenstore/xsnode.py index ae770219ab..f9721caf1b 100644 --- a/tools/python/xen/xend/xenstore/xsnode.py +++ b/tools/python/xen/xend/xenstore/xsnode.py @@ -2,7 +2,9 @@ import errno import os import os.path import select +import socket import sys +import threading import time from xen.lowlevel import xs @@ -12,18 +14,26 @@ from xen.xend.PrettyPrint import prettyprint SELECT_TIMEOUT = 2.0 def getEventPath(event): - return os.path.join("/_event", event) + if event and event.startswith("/"): + event = event[1:] + return os.path.join("/event", event) def getEventIdPath(event): - return os.path.join(eventPath(event), "@eid") + return os.path.join(getEventPath(event), "@eid") class Subscription: - def __init__(self, event, fn, id): - self.event = event + def __init__(self, path, fn, sid): + self.path = path self.watcher = None self.fn = fn - self.id = id + self.sid = sid + + def getPath(self): + return self.path + + def getSid(self): + return self.sid def watch(self, watcher): self.watcher = watcher @@ -34,10 +44,11 @@ class Subscription: if watcher: self.watcher = None watcher.delSubs(self) + return watcher - def notify(self, event): + def notify(self, path, val): try: - self.fn(event, id) + self.fn(self, path, val) except SystemExitException: raise except: @@ -45,45 +56,45 @@ class Subscription: class Watcher: - def __init__(self, store, event): - self.path = getEventPath(event) - self.eidPath = getEventIdPath(event) + def __init__(self, store, path): + self.path = path store.mkdirs(self.path) - if not store.exists(self.eidPath): - store.writeInt(self.eidPath, 0) self.xs = None - self.subs = [] + self.subscriptions = [] - def __getattr__(self, k, v): - if k == "fileno": - if self.xs: - return self.xs.fileno - else: - return -1 + def fileno(self): + if self.xs: + return self.xs.fileno else: - return self.__dict__.get(k, v) + return -1 + + def getPath(self): + return self.path def addSubs(self, subs): - self.subs.append(subs) + self.subscriptions.append(subs) self.watch() def delSubs(self, subs): - self.subs.remove(subs) - if len(self.subs) == 0: + self.subscriptions.remove(subs) + if len(self.subscriptions) == 0: self.unwatch() - def getEvent(self): - return self.event - def watch(self): if self.xs: return self.xs = xs.open() - self.xs.watch(path) + self.xs.watch(self.path) def unwatch(self): if self.xs: - self.xs.unwatch(self.path) - self.xs.close() + try: + self.xs.unwatch(self.path) + except Exception, ex: + print 'Watcher>unwatch>', ex + try: + self.xs.close() + except Exception, ex: + pass self.xs = None def watching(self): @@ -92,22 +103,38 @@ class Watcher: def getNotification(self): p = self.xs.read_watch() self.xs.acknowledge_watch() - eid = self.xs.readInt(self.eidPath) return p - def notify(self, subs): - p = self.getNotification() - for s in subs: - s.notify(p) - + def notify(self): + try: + p = self.getNotification() + v = self.xs.read(p) + for s in subscriptions: + s.notify(p, v) + except Exception, ex: + print 'Notify exception:', ex + +class EventWatcher(Watcher): + + def __init__(self, store, path, event): + Watcher.__init__(self, store, path) + self.event = event + self.eidPath = getEventIdPath(event) + if not store.exists(self.eidPath): + store.write(self.eidPath, str(0)) + + def getEvent(self): + return self.event + class XenStore: + xs = None + watchThread = None + subscription_id = 1 + def __init__(self): - self.xs = None - #self.xs = xs.open() - self.subscription = {} - self.subscription_id = 0 - self.events = {} + self.subscriptions = {} + self.watchers = {} self.write("/", "") def getxs(self): @@ -119,8 +146,8 @@ class XenStore: ex = None break except Exception, ex: - print >>stderr, "Exception connecting to xsdaemon:", ex - print >>stderr, "Trying again..." + print >>sys.stderr, "Exception connecting to xsdaemon:", ex + print >>sys.stderr, "Trying again..." time.sleep(1) else: raise ex @@ -217,70 +244,85 @@ class XenStore: self.getxs().write(path, data, create=create, excl=excl) def begin(self, path): - self.getxs().begin_transaction(path) + self.getxs().transaction_start(path) def commit(self, abandon=False): - self.getxs().end_transaction(abort=abandon) + self.getxs().transaction_end(abort=abandon) + + def watch(self, path, fn): + watcher = self.watchers.get(path) + if not watcher: + watcher = self.addWatcher(Watcher(self, path)) + return self.addSubscription(watcher, fn) + + def unwatch(self, sid): + s = self.subscriptions.get(sid) + if not s: return + del self.subscriptions[s.sid] + watcher = s.unwatch() + if watcher and not watcher.watching(): + del self.watchers[path] def subscribe(self, event, fn): - watcher = self.watchEvent(event) - self.subscription_id += 1 - subs = Subscription(event, fn, self.subscription_id) - self.subscription[subs.id] = subs - subs.watch(watcher) - return subs.id + path = getEventPath(event) + watcher = self.watchers.get(path) + if not watcher: + watcher = self.addWatcher(EventWatcher(self, path, event)) + return self.addSubscription(watcher, fn) - def unsubscribe(self, sid): - s = self.subscription.get(sid) - if not s: return - del self.subscription[s.id] - s.unwatch() - unwatchEvent(s.event) + unsubscribe = unwatch def sendEvent(self, event, data): eventPath = getEventPath(event) eidPath = getEventIdPath(event) try: - self.begin(eventPath) + #self.begin(eventPath) self.mkdirs(eventPath) + eid = 1 if self.exists(eidPath): - eid = self.readInt(eidPath) - eid += 1 - else: - eid = 1 - self.writeInt(eidPath, eid) + data = self.read(eidPath) + print 'sendEvent>', 'data=', data, type(data) + try: + eid = int(self.read(eidPath)) + eid += 1 + except Exception, ex: + print 'sendEvent>', ex + pass + self.write(eidPath, str(eid)) self.write(os.path.join(eventPath, str(eid)), data) finally: - self.commit() + #self.commit() + pass - def watchEvent(self, event): - if event in self.events: - return - watcher = Watcher(event) - self.watchers[watcher.getEvent()] = watcher + def addWatcher(self, watcher): + self.watchers[watcher.getPath()] = watcher self.watchStart() return watcher - def unwatchEvent(self, event): - watcher = self.watchers.get(event) - if not watcher: - return - if not watcher.watching(): - del self.watchers[event] + def addSubscription(self, watcher, fn): + self.subscription_id += 1 + subs = Subscription(watcher.getPath(), fn, self.subscription_id) + self.subscriptions[subs.sid] = subs + subs.watch(watcher) + return subs.sid def watchStart(self): if self.watchThread: return - + self.watchThread = threading.Thread(name="Watcher", + target=self.watchMain) + self.watchThread.setDaemon(True) + self.watchThread.start() + def watchMain(self): try: while True: if self.watchThread is None: return - if not self.events: + if not self.watchers: return rd = self.watchers.values() try: - (rd, wr, er) = select.select(rd, [], [], SELECT_TIMEOUT) - for watcher in rd: + (srd, swr, ser) = select.select(rd, [], [], SELECT_TIMEOUT) + for watcher in srd: watcher.notify() except socket.error, ex: if ex.args[0] in (EAGAIN, EINTR): @@ -315,6 +357,9 @@ class XenNode: else: raise ValueError("path does not exist: '%s'" % path) + def getStore(self): + return self.store + def relPath(self, path=""): if not path: return self.path @@ -376,6 +421,24 @@ class XenNode: def releaseDomain(self, dom): self.store.releaseDomain(dom) + def watch(self, fn, path=""): + """Watch a path for changes. The path is relative + to the node and defaults to the node itself. + """ + return self.store.watch(self.relPath(path), fn) + + def unwatch(self, sid): + return self.store.unwatch(sid) + + def subscribe(self, event, fn): + return self.store.subscribe(event, fn) + + def unsubscribe(self, sid): + self.store.unsubscribe(sid) + + def sendEvent(self, event, data): + return self.store.sendEvent(event, data) + def __repr__(self): return "" % self.path -- 2.30.2